-
Notifications
You must be signed in to change notification settings - Fork 816
Ingest samples older than 1h for block store #2819
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Should probably clarify in the title this is for block store - chunks has ingested old samples forever. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @codesome for working on this! I did a very quick first pass and left some design feedback. My suggestion is: try to keep it as simplest and cleanest as possible. It's fine if you need to submit preliminary refactorings to existing code to simplify this PR, but let's try to come up with a clean design.
Please also remember:
- Add a CHANGELOG entry
- I would allow to disable backfilling setting a 0 value for the "max age" (left a dedicated comment)
Limits on the additional TSDB
I would leave this outside of this PR. It's fine adding them separately to keep this PR a bit smaller.
Signed-off-by: Ganesh Vernekar <[email protected]>
Signed-off-by: Ganesh Vernekar <[email protected]>
Signed-off-by: Ganesh Vernekar <[email protected]>
Signed-off-by: Ganesh Vernekar <[email protected]>
Signed-off-by: Ganesh Vernekar <[email protected]>
Signed-off-by: Ganesh Vernekar <[email protected]>
Signed-off-by: Ganesh Vernekar <[email protected]>
Signed-off-by: Ganesh Vernekar <[email protected]>
Signed-off-by: Ganesh Vernekar <[email protected]>
Signed-off-by: Ganesh Vernekar <[email protected]>
Signed-off-by: Ganesh Vernekar <[email protected]>
Signed-off-by: Ganesh Vernekar <[email protected]>
Signed-off-by: Ganesh Vernekar <[email protected]>
225a602
to
fd1fa7f
Compare
Signed-off-by: Ganesh Vernekar <[email protected]>
Signed-off-by: Ganesh Vernekar <[email protected]>
Signed-off-by: Ganesh Vernekar <[email protected]>
Signed-off-by: Ganesh Vernekar <[email protected]>
…fixes. Signed-off-by: Ganesh Vernekar <[email protected]>
The code looks more complex than I desire it to be. I am currently writing more units tests and once I am satisfied with that, I will spend some time in simplifying the code wherever possible before moving onto manual tests. |
…ting blocks. Signed-off-by: Ganesh Vernekar <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another partial review, sorry. I think this logic is too much complicated. I stopped reviewing it because I'm wondering if it's worth all such complexity to fix the issue we're trying to fix. Let's talk offline.
Things I would like to discuss (don't jump on coding it immediately):
- The transfer doesn't support backfill TSDBs. I'm fine with that (I believe we shouldn't), but this made me think if we could simplify the shutdown procedure and actually always snapshot and ship backfill blocks to the storage at shutdown.
@@ -146,6 +147,8 @@ type TSDBConfig struct { | |||
StripeSize int `yaml:"stripe_size"` | |||
WALCompressionEnabled bool `yaml:"wal_compression_enabled"` | |||
FlushBlocksOnShutdown bool `yaml:"flush_blocks_on_shutdown"` | |||
BackfillDir string `yaml:"backfill_dir"` | |||
BackfillLimit time.Duration `yaml:"backfill_limit"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CLI flag is called backfill-max-age
while the YAML config option is backfill_limit
. We should keep them consistent. Between the two I believe backfill_max_age
is more clear to understand. I would rename the BackfillLimit
variable accordingly.
@@ -119,10 +123,42 @@ func (u *userTSDB) setLastUpdate(t time.Time) { | |||
u.lastUpdate.Store(t.Unix()) | |||
} | |||
|
|||
func (u *userTSDB) getShippedBlocksULID() ([]ulid.ULID, error) { | |||
b, err := ioutil.ReadFile(filepath.Join(u.Dir(), shipper.MetaFilename)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two things:
- I would use
shipper.ReadMetaFile()
to simplify this function - If the file does not exists, it should return an empty list of block IDs and no error (it's an OK error if no block has been shipped yet)
@@ -274,6 +311,11 @@ func (i *Ingester) startingV2(ctx context.Context) error { | |||
return errors.Wrap(err, "opening existing TSDBs") | |||
} | |||
|
|||
// Scan and open backfill TSDB's that already exist on disk. | |||
if err := i.openExistingBackfillTSDB(context.Background()); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should pass ctx
, not context.Background()
.
} | ||
|
||
func (i *Ingester) openExistingBackfillTSDB(ctx context.Context) error { | ||
level.Info(util.Logger).Log("msg", "opening existing TSDBs") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
level.Info(util.Logger).Log("msg", "opening existing TSDBs") | |
level.Info(util.Logger).Log("msg", "opening existing backfill TSDBs") |
} | ||
|
||
userID := u.Name() | ||
userPath := filepath.Join(i.cfg.BlocksStorageConfig.TSDB.BackfillDir, userID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you've added the function BackfillBlocksDir()
on the config specifically for this 😉
buckets.Unlock() | ||
} | ||
i.TSDBState.backfillDBs.tsdbsMtx.Unlock() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this point, we should log the success/error similarly to what we do in openExistingTSDB()
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
i.closeAllBackfillTSDBs() | ||
}() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should do it after i.userStatesMtx.Unlock()
. Also to further simplify the code, I would run wg.Wait()
to wait until all user TSDBs are closed, and then I would call closeAllBackfillTSDBs()
outside of any go routine.
}) | ||
} | ||
|
||
func (i *Ingester) runConcurrentBackfillWorkers(ctx context.Context, concurrency int, userFunc func(*userTSDB)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think runConcurrentBackfillWorkers()
and runConcurrentUserWorkers()
could be generalised into a single function like this:
func runConcurrentUserWorkers(ctx context.Context, userIDs []string, concurrency int, userFunc func(userID string))
Then:
- You need two functions to generate the list of user IDs. We already have
i.getTSDBUsers()
then you could do the same for the backfill TSDBs. - The callback passed to
runConcurrentBackfillWorkers()
but looks easy passing the userID to the callback and then having the callback fetching the backfilling TSDBs. I understand the actual parallelisation is not the exact same, but we should strive for simplicity first (without compromising correctness) and then we can always optimise it. As already mentioned a bunch of times, I expect backfilling to be an uncommon use case (I don't want to keep multiple TSDBs open for every tenant any time)
This is a refactoring that could be done in a preliminary PR.
cause := errors.Cause(err) | ||
if cause == storage.ErrOutOfBounds && | ||
i.cfg.BlocksStorageConfig.TSDB.BackfillLimit > 0 && | ||
s.TimestampMs > db.Head().MaxTime()-i.cfg.BlocksStorageConfig.TSDB.BackfillLimit.Milliseconds() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- There could be any case when
db.Head().MaxTime()
math.MinInt64
? - But should be based on head max time or
time.Now()
? The "max age" could be also see an compared to "now". All other time-based limits we have are based on "now".
@@ -495,8 +552,13 @@ func (i *Ingester) v2Push(ctx context.Context, req *client.WriteRequest) (*clien | |||
} | |||
|
|||
// The error looks an issue on our side, so we should rollback | |||
if rollbackErr := app.Rollback(); rollbackErr != nil { | |||
level.Warn(util.Logger).Log("msg", "failed to rollback on error", "user", userID, "err", rollbackErr) | |||
var merr tsdb_errors.MultiError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renaming merr
to rollbackErr
as was previously could help to clarify what this error is about.
Superseded by #3025 |
What this PR does:
This PR is currently a super early draft for ingesting samples older than 1h. Lots of TODOs.
The fail in out-of-bound test shows that we are currently appending out-of-bound samples.
Which issue(s) this PR fixes:
Fixes #2366
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]
TODO in future PRs